热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

golang入门项目—日志收集

传统ELK架构的日志收集:存在的问题:Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。适用于小规模的集群使用。第二种架构:位于各个节点上

传统ELK架构的日志收集:

存在的问题:Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。适用于小规模的集群使用。

第二种架构:

位于各个节点上的Log Agent先将数据/日志传递给Kafka,并将队列中消息或数据交由Log Transfer,传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka,数据会先被存储下来,所以即使Logstash server因故障停止运行,数据也不会丢失。这种架构适合于较大集群使用

各组件介绍:
LogAgent:日志收集客户端,用来收集服务器上的日志
Kafka:高吞吐量的分布式队列(Linkin开发,apache顶级开源项目),消息队列和日志存储。
ElasticSearch:开源的搜索引擎,提供介于HTTP RESTful的web接口
Kibana:开源的ES数据分析和可视化工具。
Hadoop:分布式计算框架,能够对大量数据进行分布式处理的平台。
Storm:一个免费并开源的分布式实时计算系统
引用链接

Kafka和tailf的参考链接

Zookeeper:ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。
Zookeeper扮演红色角色

ElasticSearch:是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene™ 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单,它不仅包括了全文搜索功能,还可以进行以下工作:
分布式实时文件存储,并将每一个字段都编入索引,使其可以被搜索。
实时分析的分布式搜索引擎。
可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。

Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看存放在Elasticsearch中的数据。Kibana与Elasticsearch的交互方式是各种不同的图表、表格、地图等,直观的展示数据,从而达到高级的数据分析与可视化的目的。
Elasticsearch、Logstash和Kibana这三个技术就是我们常说的ELK技术栈,可以说这三个技术的组合是大数据领域中一个很巧妙的设计。一种很典型的MVC思想,模型持久层,视图层和控制层。Logstash担任控制层的角色,负责搜集和过滤数据。Elasticsearch担任数据持久层的角色,负责储存数据。而我们这章的主题Kibana担任视图层角色,拥有各种维度的查询和分析,并使用图形化的界面展示存放在Elasticsearch中的数据。

etcd 是一个分布式键值对存储系统,由coreos 开发,内部采用 raft 协议作为一致性算法,用于可靠、快速地保存关键数据,并提供访问。通过分布式锁、leader选举和写屏障(write barriers),来实现可靠的分布式协作。etcd集群是为高可用、持久化数据存储和检索而准备。
etcd架构图:

 

 

源码:

logagent包

config.ini

[kafka]
address=127.0.0.1:9092
chan_max_size=100000

[etcd]
address=127.0.0.1:2379
timeout=5
collect_log_key=/log/%s/collect_config

[taillog]
filename=./my.log
timeout=5
1
2
3
4
5
6
7
8
9
10
11
12
config.go

package conf

type AppConf struct {
KafkaConf `ini:"kafka"`
EtcdConf `ini:"etcd"`
}

type KafkaConf struct {
Address string `ini:"address"`
ChanMaxSize int `ini:"chan_max_size"`
}

type EtcdConf struct {
Address string `ini:"address"`
Key string `ini:"collect_log_key"`
Timeout int `ini:"timeout"`
}

type TaillogConf struct {
Filename string `ini:"filename"`
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
etcd.go

package etcd

import (
"context"
"encoding/json"
"fmt"
"time"

"go.etcd.io/etcd/clientv3"
)

var (
cli *clientv3.Client
)

//需要收集的日志的配置信息
type LogEntry struct {
Path string `json:"path"`
Topic string `json:"topic"`
}

//初始化etcd的函数
func Init(addr string, timeout time.Duration) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{addr},
DialTimeout: timeout,
})
if err != nil {
fmt.Println("connect to etcd success")
return
}
fmt.Println("connect to etcd success")
}

//从etcd中获取根据key配置项
func GetConf(key string) (LogEntryConf []*LogEntry, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("put to etcd failed,err:", err)
return
}
for _, ev := range resp.Kvs {
err = json.Unmarshal(ev.Value, &LogEntryConf)
if err != nil {
fmt.Println("unmarshal etcd value failed,err:", err)
return
}
fmt.Printf("value:%s\n", ev.Value)
}
return
}

//etcd_watch
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key)
//从通道尝试取值(监视的信息)
for wresp := range ch {
for _, evt := range wresp.Events {
fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
//通知taillog.tskMgr
//1.先判断操作的类型
var newConf []*LogEntry
// if evt.Type != clientv3.EventTypeDelete{
// //如果是删除操作,手动传递一个空的配置项
// err := json.Unmarshal(evt.Kv.Value, &newConf)
// if err != nil {
// fmt.Println("unmarshal failed,err:", err)
// continue
// }
err := json.Unmarshal(evt.Kv.Value, &newConf)
if err != nil {
fmt.Println("unmarshal failed,err:", err)
continue
}
fmt.Println("get new conf:", newConf)
newConfCh <- newConf
}

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
etcd_put.go

package main

import (
"context"
"fmt"
"time"

"go.etcd.io/etcd/clientv3"
)

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, //节点
DialTimeout: 5 * time.Second, //超过5秒钟连不上超时
})
if err != nil {
fmt.Println("connect to etcd failed:", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"d:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]`
_, err = cli.Put(ctx, "/log/192.168.1.7/collect_config", value)
cancel()
if err != nil {
fmt.Println("put to etcd failed,err:", err)
return
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
kafka.go

package kafka

//log Agent入口
import (
"fmt"
"time"

"github.com/Shopify/sarama"
)

type logData struct {
topic string
data string
}

var (
client sarama.SyncProducer //声明一个全局的连接kafka的生产者client
logDataChan chan *logData
)

//初始化client
func Init(addrs []string, maxSize int) (err error) {
config := sarama.NewConfig()
//tailf包使用
config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认
config.Producer.PartitiOner= sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true //成功交付的消息将在success channel返回
//连接kafka
client, err = sarama.NewSyncProducer(addrs, config)
if err != nil {
fmt.Println("producer closed,err:", err)
return
}
fmt.Println("连接kafka成功!")
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
//初始化logDataChan
logDataChan = make(chan *logData, maxSize)
//开启后台的goroutine从通道中取数据发往kafka
go SendToKarfka()
return
}

//给外部暴露的一个函数,该函数只把日志数据发送到一个内部的channel中
func SendToChan(topic, data string) {
msg := &logData{
topic: topic,
data: data,
}
logDataChan <- msg
}

//真正往kafka发送日志的函数
func SendToKarfka() {
for {
select {
case ld := <-logDataChan:
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = ld.topic
msg.Value = sarama.StringEncoder(ld.data)
//发送到kafka
pid, offset, err := client.SendMessage(msg) //offset是写成功的文件的索引位置
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
default:
time.Sleep(time.Millisecond * 50)
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
tail.go

package taillog

import (
"context"
"fmt"
"test/log/kafka"

"github.com/hpcloud/tail"
)

var (
tailObj *tail.Tail
LogChan chan string
)

//TailTask:一个日志收集的任务
type TailTask struct {
path string
topic string
instance *tail.Tail
//为了能够实现退出t.run()
ctx context.Context
cancelFunc context.CancelFunc
}

func NewTailTask(path, topic string) (tailObj *TailTask) {
ctx, cancel := context.WithCancel(context.Background())
tailObj = &TailTask{
path: path,
topic: topic,
ctx: ctx,
cancelFunc: cancel,
}
tailObj.init() //根据路径去打开对应的日志
return
}

func (t *TailTask) init() {
config := tail.Config{
ReOpen: true, //重新打开
Follow: true, //是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
MustExist: false, //文件不存在不报错
Poll: true,
}
var err error
t.instance, err = tail.TailFile(t.path, config)
if err != nil {
fmt.Println("tail file failed,err:", err)
}
go t.run() //直接去采集日志发送到kafka
}

func (t *TailTask) run() {
for {
select {
case <-t.ctx.Done():
fmt.Printf("tail tast:%v_%s finish...\n", t.path, t.topic)
return
case line := <-t.instance.Lines: //从tailObj的通道中一行一行的读取日志数据
// kafka.SendToKarfka(t.topic, line.Text) //函数调函数
//先把日志数据发到一个通道中
kafka.SendToChan(t.topic, line.Text)
//kafka那个包中有单独的goroutine去取日志数据发到kafka

}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
tail_mgr.go

package taillog

import (
"fmt"
"test/log/etcd"
"time"
)

var tskMgr *taillogMgr

//tailTask 管理者
type taillogMgr struct {
logEntry []*etcd.LogEntry
tskMap map[string]*TailTask
newConfChan chan []*etcd.LogEntry
}

func Init(logEntryConf []*etcd.LogEntry) {
tskMgr = &taillogMgr{
logEntry: logEntryConf, //把当前的日志收集配置信息保存起来
tskMap: make(map[string]*TailTask, 16),
newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道
}
for _, LogEntry := range logEntryConf {
//conf:*etcd.LogEntry
//logEntry.Path:要收集的日志文件的路径
//初始化的时候起了多少个tailtask都要记下来,为了后续判断方便
tailObj := NewTailTask(LogEntry.Path, LogEntry.Topic)
mk := fmt.Sprintf("%s_%s", LogEntry.Path, LogEntry.Topic)
tskMgr.tskMap[mk] = tailObj
}
go tskMgr.run()
}

//监听自己的newConfChan,有了新的配置过来之后就做对应的处理
func (t *taillogMgr) run() {
for {
select {
case newConf := <-t.newConfChan:
fmt.Println("新的配置来了!", newConf)
for _, conf := range newConf {
mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
_, ok := t.tskMap[mk]
if ok {
//原来就有,不需要操作
continue
} else {
//新增的
tailObj := NewTailTask(conf.Path, conf.Topic)
t.tskMap[mk] = tailObj
}
}
//找出原来t.tskMap有,但是newConf中没有的,要删除
for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项
isDelete := true
for _, c2 := range newConf { //去新的配置中逐一进行比较
if c2.Path == c1.Path && c2.Topic == c1.Topic {
isDelete = false
continue
}
}
if isDelete {
//把c1对应的tailObj给停掉
mk := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
t.tskMap[mk].cancelFunc()
}
}
//1.配置新增
//2.配置删除
//3.配置变更
default:
time.Sleep(time.Second)
}
}
}

//一个函数,向外暴露tskMgr的newConfChan
func NewConfChan() chan<- []*etcd.LogEntry {
return tskMgr.newConfChan
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
ip.go

package utils

import (
"net"
"strings"
)

//GetOutboundIP 获取本地对外IP

func GetOutboundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
ip = strings.Split(localAddr.IP.String(), ":")[0]
return
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
main.go

package main

import (
"fmt"
"sync"
"test/log/conf"
"test/log/etcd"
"test/log/kafka"
"test/log/taillog"
"test/log/utils"
"time"

"gopkg.in/ini.v1"
)

var (
cfg = new(conf.AppConf)
)

func main() {
//0.加载配置文件
err := ini.MapTo(cfg, "./conf/config.ini")
if err != nil {
println("load ini failed,err:", err)
return
}
//1.初始化一个kafka连接
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)
if err != nil {
fmt.Println("init kafka failed,err:", err)
return
}
fmt.Println("初始化成功!")
//2.初始化etcd
etcd.Init(cfg.EtcdConf.Address, time.Duration(cfg.EtcdConf.Timeout)*time.Second)
if err != nil {
fmt.Println("init etcd failed,err:", err)
return
}
//为了实现每个logagent都拉取自己独有的配置,所以要以自己的IP地址作为区分
ipStr, err := utils.GetOutboundIP()
if err != nil {
panic(err)
}
etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
fmt.Printf("etcdConfKey:%s\n", etcdConfKey)
//2.1 从etcd中获取日志收集项的配置信息
logEntryConf, err := etcd.GetConf(etcdConfKey)
if err != nil {
fmt.Println("etcd.GetConf failed,err:", err)
return
}
fmt.Println("get conf from etcd success:", logEntryConf)
//2.2 派一个哨兵去监视日志收集项的变化(有变化及时通知我的logAgent实现加载配置)

for index, value := range logEntryConf {
fmt.Printf("index:%v value:%v\n", index, value)
}
fmt.Println("init etcd success.")

//3.收集日志发往Kafka
//3.1 循环每一个日志收集项,创建一个TailObj
taillog.Init(logEntryConf)
//因为NewConfChan访问了tskMgr的newConfChan,这个channel是在taillog.Init(logEntryConf)执行的初始化
newConfChan := taillog.NewConfChan() //从taillog包中获取对外暴露的通道
var wg sync.WaitGroup
wg.Add(1)
go etcd.WatchConf(etcdConfKey, newConfChan) //哨兵发现最新的配置信息会通知上面的那个通道
wg.Wait()
//3.2发往Kafka

//4.打开日志文件准备收集日志
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
log_transfer包

cfg.ini

[kafka]
address=127.0.0.1:9092
topic=web_log

[es]
address=127.0.0.1:9200
size=100000

1
2
3
4
5
6
7
8
cfg.go

package conf

//LogTransfer 全局配置
type Logtransfer struct {
KafkaCfg `ini:"kafka"`
ESCfg `ini:"es"`
}

//Kafka...
type KafkaCfg struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}

//ESCfg
type ESCfg struct {
Address string `ini:"address"`
ChanSize int `ini:"size"`
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
es.go

package es

import (
"context"
"fmt"
"strings"
"time"

"github.com/olivere/elastic/v7"
)

//初始化ES,准备接收kafka那边发来的数据

type LogData struct {
Topic string `json:"topic"`
Data string `json:"data"`
}

var (
client *elastic.Client
ch chan *LogData
)

//init...
func Init(address string, chanSize int) (err error) {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
client, err = elastic.NewClient(elastic.SetURL(address))
if err != nil {
return
}
fmt.Println("connect to es success")
ch = make(chan *LogData, chanSize)
go SendToES()
return
}

// func SendToESChan(d *LogData) (err error) {
// msg := &LogData{}
// msg.Topic = d.Topic
// msg.Data = string(d.Data)

// _, err = client.Index().
// Index(d.Topic).
// BodyJson(msg).
// Do(context.Background())
// if err != nil {
// panic(err)
// }
// return
// }

func SendToESChan(msg *LogData) {
ch <- msg
}

//发送数据到ES
func SendToES() {
//链式操作
for {
select {
case msg := <-ch:
put1, err := client.Index().
Index(msg.Topic). //Index表数据库
Type("xxx").
BodyJson(msg). //把一个go语言的对象转换为json格式
Do(context.Background())
if err != nil {
fmt.Println(err)
}
fmt.Printf("Indexed %s to index %s,type %s\n", put1.Id, put1.Index, put1.Type)
default:
time.Sleep(time.Second)
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
kafka.go

package kafka

import (
"fmt"
"test/log_transfer/es"

"github.com/Shopify/sarama"
)

//LogData...
type LogData struct {
Data string `json:"data"`
}

//初始化kafka消费者,从kafka取数据发往ES
func Init(addr []string, topic string) (err error) {
consumer, err := sarama.NewConsumer(addr, nil)
if err != nil {
fmt.Printf("fail to start consumer,err:%v\n", err)
return
}
partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
if err != nil {
fmt.Println("fail to get list of partition:", err)
return
}
var pc sarama.PartitionConsumer
fmt.Println(partitionList)
for partition := range partitionList {
pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
return
}
defer pc.AsyncClose()
//异步从每个分区消费消息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
//直接发给ES
var ld = es.LogData{
Topic: topic,
Data: string(msg.Value),
}
es.SendToESChan(&ld) //函数调函数
//优化一下,直接放到chann中
}
}(pc)
select {}
}
return
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
main.go

package main

import (
"fmt"
"test/log_transfer/conf"
"test/log_transfer/es"
"test/log_transfer/kafka"

"gopkg.in/ini.v1"
)

//log transfer
//将日志数据从kafka取出来发往ES

func main() {
//0.加载配置文件
var cfg conf.Logtransfer
err := ini.MapTo(&cfg, "./conf/cfg.ini")
if err != nil {
fmt.Println("init config err:", err)
return
}
fmt.Printf("cfg:%v\n", cfg)
//1.初始化ES
//1.1 初始化一个ES连接的client
//1.2 对外提供y一个往ES写入数据的一个函数
err = es.Init(cfg.ESCfg.Address, cfg.ESCfg.ChanSize)
if err != nil {
fmt.Println("init ES consumer failed,err:", err)
return
}
fmt.Println("init es success.")
//2.初始化Kafka
//2.1 连接kafka,创建分区的消费者
//2.2 每个分区的消费者分别取出数据,通过sendToES()将数据发往ES
err = kafka.Init([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic)
if err != nil {
fmt.Println("init kafka consumer failed,err:", err)
return
}
//3.从kafka中取数据

//4.发往ES
select {}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
自己编写版):

config包:

config.ini

[kafka]
address=127.0.0.1:9092
chan_max_size=100000

[etcd]
address=127.0.0.1:2379
timeout=5
log_key=/log/collect_config

[es]
address=127.0.0.1:9200
size=100000
1
2
3
4
5
6
7
8
9
10
11
12
config.go

package config

import (
"context"

"github.com/hpcloud/tail"
)

type AppConf struct {
KafkaConf `ini:"kafka"`
EtcdConf `ini:"etcd"`
ESConf `ini:"es"`
}

type KafkaConf struct {
Address string `ini:"address"`
Max_size int `ini:"chan_max_size"`
}

type EtcdConf struct {
Address string `ini:"address"`
Timeout int `ini:"timeout"`
Log_key string `ini:"log_key"`
}

type ESConf struct {
Address string `ini:"address"`
Max_size int `ini:"size"`
}

type LogConf struct {
Path string `ini:"path"`
Topic string `ini:"topic"`
}

type LogEntryConf []*LogConf

type TailTask struct {
Path string
Topic string
Instance *tail.Tail
Ctx context.Context
CancelF context.CancelFunc
}

type LogData struct {
Topic string
Data string
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
es包

es.go

package es

import (
"context"
"fmt"
"test/mylog/config"
"time"

"github.com/olivere/elastic/v7"
)

var (
client *elastic.Client
ESchan chan *config.LogData
)

func Init(address string, size int) (err error) {
client, err = elastic.NewClient(elastic.SetURL(address))
if err != nil {
fmt.Println("Init ES failed,err:", err)
return
}
ESchan = make(chan *config.LogData, 1000)
go SendToES()
return
}

func SendToESChan(msg *config.LogData) {
ESchan <- msg
fmt.Println("sssss")
}

func SendToES() {
for {
select {
case msg := <-ESchan:
put1, err := client.Index().Index(msg.Topic).BodyJson(msg).Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Index user:%s to index %s,type:%s\n", put1.Id, put1.Index, put1.Type)
default:
time.Sleep(time.Second)
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
etcd包

etcd.go

package etcd

import (
"context"
"encoding/json"
"fmt"
"time"

"test/mylog/config"
"test/mylog/tail"

"go.etcd.io/etcd/clientv3"
)

var (
client *clientv3.Client
logdata config.LogEntryConf
)

func Init(address []string, timeout int) (err error) {
client, err = clientv3.New(clientv3.Config{
Endpoints: address,
DialTimeout: time.Duration(timeout) * time.Second,
})
if err != nil {
fmt.Println("connect to etcd failed,err:\n", err)
return
}
fmt.Println("connect to etcd success!")
return
}

func GetConf(key string) (logconf config.LogEntryConf, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
var resp *clientv3.GetResponse
resp, err = client.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("get from etcd failed,err:", err)
return
}
for _, ev := range resp.Kvs {
err = json.Unmarshal(ev.Value, &logconf)
if err != nil {
fmt.Println("Unmarshal json failed:", err)
return
}
}
return
}

func WatchConf(topic string) {
rch := client.Watch(context.Background(), topic)
channel := tail.Get_chan()
for wresp := range rch {
for _, ev := range wresp.Events {
err := json.Unmarshal(ev.Kv.Value, &logdata)
if err != nil {
fmt.Println("Update conf failed,err:", err)
return
}
fmt.Println("update config success:", ev.Kv.Value)
channel <- logdata
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
kafka包

kafka.go

package kafka

import (
"fmt"
"test/mylog/config"
"test/mylog/es"

"github.com/Shopify/sarama"
)

var (
client sarama.SyncProducer
logDataChan chan *config.LogData
consumer sarama.Consumer
pc sarama.PartitionConsumer
)

func Init(address []string, max_size int) (err error) {
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.PartitiOner= sarama.NewRandomPartitioner
cfg.Producer.Return.Successes = true

client, err = sarama.NewSyncProducer(address, cfg)
if err != nil {
fmt.Println("Produce error:", err)
return
}

logDataChan = make(chan *config.LogData, max_size)

consumer, err = sarama.NewConsumer(address, nil)
if err != nil {
fmt.Println("Init consumer failed,err:", err)
return
}

go SendMessage()

return
}

func SendToChan(topic, data string) {
var t = &config.LogData{
Topic: topic,
Data: data,
}
logDataChan <- t
}

func SendMessage() {
for {
select {
case ld := <-logDataChan:
msg := sarama.ProducerMessage{}
msg.Topic = ld.Topic
msg.Value = sarama.StringEncoder(ld.Data)
pid, offset, err := client.SendMessage(&msg)
if err != nil {
fmt.Println("Send Message error:", err)
}
fmt.Printf("pid:%v offser:%v Topic:%v Value:%v\n", pid, offset, ld.Topic, ld.Data)
default:
}
}
}

func Consumer(topic string) {
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Println("Get partitions failed,err:", err)
return
}
for partition := range partitionList {
pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("failed to start consumer for partition,err:", err)
return
}
}
defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
t := &config.LogData{
Topic: topic,
Data: string(msg.Value),
}
es.SendToESChan(t)
}
}(pc)
select {}
}

// func Consumer(topic string) (err error) {
// partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
// if err != nil {
// fmt.Println("fail to get list of partition:", err)
// return
// }
// var pc sarama.PartitionConsumer
// fmt.Println(partitionList)
// for partition := range partitionList {
// pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
// if err != nil {
// fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
// return
// }
// defer pc.AsyncClose()
// //异步从每个分区消费消息
// go func(sarama.PartitionConsumer) {
// for msg := range pc.Messages() {
// fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
// //直接发给ES
// var ld = config.LogData{
// Topic: topic,
// Data: string(msg.Value),
// }
// es.SendToESChan(&ld) //函数调函数
// //优化一下,直接放到chann中
// }
// }(pc)
// select {}
// }
// return
// }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
tail包

tail.go

package tail

import (
"context"
"fmt"
"test/mylog/config"
"test/mylog/kafka"

"github.com/hpcloud/tail"
)

type Tasks config.TailTask

var (
tails *tail.Tail
tasks_map map[string]*config.TailTask
tasks_chan chan config.LogEntryConf
)

func run(T *config.TailTask) {
for {
select {
case <-T.Ctx.Done():
return
case line := <-T.Instance.Lines:
kafka.SendToChan(T.Topic, line.Text)
}
}
}

func Init(Tvalue config.LogEntryConf) error {
tasks_map = make(map[string]*config.TailTask, 100)
tasks_chan = make(chan config.LogEntryConf)
for _, value := range Tvalue {
base := config.LogConf{
Path: value.Path,
Topic: value.Topic,
}
Task, err := NewTask(base)
name := fmt.Sprintf("%s\\%s", value.Path, value.Topic)
tasks_map[name] = Task
if err != nil {
fmt.Println("Init tail failed,err:", err)
return err
}
go run(Task)
}
go Update_Task()
return nil
}

func Update_Task() {
for {
select {
case new_tasks := <-tasks_chan:
for _, old_task := range tasks_map {
name := fmt.Sprintf("%s\\%s", old_task.Path, old_task.Topic)
tasks_map[name].CancelF()
}
for _, new_task := range new_tasks {
name := fmt.Sprintf("%s\\%s", new_task.Path, new_task.Topic)
Task, err := NewTask(*new_task)
if err != nil {
fmt.Println("init task err:", err)
return
}
tasks_map[name] = Task
}
}
}
}

func Get_chan() chan config.LogEntryConf {
return tasks_chan
}

func NewTask(base config.LogConf) (tal *config.TailTask, err error) {
cfg := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
ctx, cancel := context.WithCancel(context.Background())
tails, err = tail.TailFile(base.Path, cfg)
if err != nil {
fmt.Println("tail file failed,err:", err)
}
tal = &config.TailTask{
Path: base.Path,
Topic: base.Topic,
Instance: tails,
Ctx: ctx,
CancelF: cancel,
}
return
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
main.go

package main

import (
"fmt"
"sync"
"test/mylog/config"
"test/mylog/es"
"test/mylog/etcd"
"test/mylog/tail"

"test/mylog/kafka"

"gopkg.in/ini.v1"
)

var wg sync.WaitGroup

func main() {
var cfg config.AppConf
err := ini.MapTo(&cfg, "./config/config.ini")
if err != nil {
fmt.Println("Decode Map failed!", err)
}
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Max_size)
if err != nil {
fmt.Println("init kafka failed", err)
return
}
fmt.Println("init kafka success!")

err = etcd.Init([]string{cfg.EtcdConf.Address}, cfg.EtcdConf.Timeout)
var path config.LogEntryConf
path, err = etcd.GetConf(cfg.Log_key)
if err != nil {
return
}
tail.Init(path)
es.Init(cfg.ESConf.Address, cfg.ESConf.Max_size)
for index, value := range path {
fmt.Printf("index:%v value:%v topic:%v\n", index, value, value.Topic)
kafka.Consumer(value.Topic)
}
wg.Add(1)
etcd.WatchConf(cfg.EtcdConf.Log_key)
wg.Done()
}

 



推荐阅读
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 在当今的软件开发领域,分布式技术已成为程序员不可或缺的核心技能之一,尤其在面试中更是考察的重点。无论是小微企业还是大型企业,掌握分布式技术对于提升工作效率和解决实际问题都至关重要。本周的Java架构师实战训练营中,我们深入探讨了Kafka这一高效的分布式消息系统,它不仅支持发布订阅模式,还能在高并发场景下保持高性能和高可靠性。通过实际案例和代码演练,学员们对Kafka的应用有了更加深刻的理解。 ... [详细]
  • 利用ZFS和Gluster实现分布式存储系统的高效迁移与应用
    本文探讨了在Ubuntu 18.04系统中利用ZFS和Gluster文件系统实现分布式存储系统的高效迁移与应用。通过详细的技术分析和实践案例,展示了这两种文件系统在数据迁移、高可用性和性能优化方面的优势,为分布式存储系统的部署和管理提供了宝贵的参考。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
  • 本文详细介绍了Java代码分层的基本概念和常见分层模式,特别是MVC模式。同时探讨了不同项目需求下的分层策略,帮助读者更好地理解和应用Java分层思想。 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
  • 本文详细介绍了在MySQL中如何高效利用EXPLAIN命令进行查询优化。通过实例解析和步骤说明,文章旨在帮助读者深入理解EXPLAIN命令的工作原理及其在性能调优中的应用,内容通俗易懂且结构清晰,适合各水平的数据库管理员和技术人员参考学习。 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • REST与RPC:选择哪种API架构风格?
    在探讨REST与RPC这两种API架构风格的选择时,本文首先介绍了RPC(远程过程调用)的概念。RPC允许客户端通过网络调用远程服务器上的函数或方法,从而实现分布式系统的功能调用。相比之下,REST(Representational State Transfer)则基于资源的交互模型,通过HTTP协议进行数据传输和操作。本文将详细分析两种架构风格的特点、适用场景及其优缺点,帮助开发者根据具体需求做出合适的选择。 ... [详细]
  • Python 实战:异步爬虫(协程技术)与分布式爬虫(多进程应用)深入解析
    本文将深入探讨 Python 异步爬虫和分布式爬虫的技术细节,重点介绍协程技术和多进程应用在爬虫开发中的实际应用。通过对比多进程和协程的工作原理,帮助读者理解两者在性能和资源利用上的差异,从而在实际项目中做出更合适的选择。文章还将结合具体案例,展示如何高效地实现异步和分布式爬虫,以提升数据抓取的效率和稳定性。 ... [详细]
  • php从哪里得到框架(2023年最新分享)
    导读:本篇文章编程笔记来给大家介绍有关php从哪里得到框架的相关内容,希望对大家有所帮助,一起来看看吧。本文目录一览:1、如何用PHP制 ... [详细]
  • 乐观锁_乐观锁 VS 悲观锁
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了乐观锁VS悲观锁相关的知识,希望对你有一定的参考价值。 ... [详细]
author-avatar
JRamboKing
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有